package com.fwmagic.dynamic_rule.modeltest.drools;

import com.fwmagic.dynamic_rule.functions.SourceFunctions;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;

import java.util.Iterator;
import java.util.Map;

/**
 * 1、从Kafka的两个流中分别读取drools规则和事件数据
 * 2、广播drools规则，对事件进行规则匹配
 *
 * kafka-topics.sh --create --topic test-drools-rules --zookeeper hd1:2181 --partitions 1 --replication-factor 1
 * kafka-topics.sh --create --topic applicant --zookeeper hd1:2181 --partitions 1 --replication-factor 1
 *
 * kafka-console-producer.sh --topic applicant --broker-list hd1:9092
 * zhangsan,20
 */
@Slf4j
public class FlinkWithDroolsTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        //读取Kafka中的事件数据
        DataStreamSource<String> eventsStream = env.addSource(SourceFunctions.createKafkaSource("applicant"));

        //转成Applicant对象
        SingleOutputStreamOperator<Applicant> mapedStream = eventsStream.map(new MapFunction<String, Applicant>() {
            @Override
            public Applicant map(String s) throws Exception {
                //zhangsan,15
                String[] arr = s.split(",");
                return new Applicant(arr[0], Integer.parseInt(arr[1]), true);
            }
        });

        //根据name进行keyBy
        KeyedStream<Applicant, String> keyedStream = mapedStream.keyBy(new KeySelector<Applicant, String>() {
            @Override
            public String getKey(Applicant applicant) throws Exception {
                return applicant.getName();
            }
        });

        //读取Kafka中的规则数据
        DataStreamSource<String> rulesStream = env.addSource(SourceFunctions.createKafkaSource("test-drools-rules"));

        //将rulesStream广播出去
        MapStateDescriptor<String, KieSession> mapStateDescriptor = new MapStateDescriptor("rulesState", String.class, KieSession.class);
        BroadcastStream<String> broadcastStream = rulesStream.broadcast(mapStateDescriptor);

        //连接eventsStream和rulesStream
        BroadcastConnectedStream<Applicant, String> connectedStream = keyedStream.connect(broadcastStream);

        SingleOutputStreamOperator<String> process = connectedStream.process(new KeyedBroadcastProcessFunction<String, Applicant, String, String>() {

            private BroadcastState<String, KieSession> broadcastState;

            @Override
            public void processElement(Applicant applicant, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
                Iterator<Map.Entry<String, KieSession>> iterator = broadcastState.iterator();
                while (iterator.hasNext()) {
                    Map.Entry<String, KieSession> entry = iterator.next();
                    KieSession kieSession = entry.getValue();

                    //插入数据，触发规则
                    kieSession.insert(applicant);
                    kieSession.fireAllRules();

                    //规则触发效果
                    if (applicant.isValid()) {
                        collector.collect(applicant.getName() + "| 合法。");
                    } else {
                        collector.collect(applicant.getName() + "| 不合法！");
                    }
                }
            }

            /**
             * 处理广播流中的数据
             * @param str
             * @param context
             * @param collector
             * @throws Exception
             */
            @Override
            public void processBroadcastElement(String str, Context context, Collector<String> collector) throws Exception {
                log.info("收到一条规则数据:{}", str);
                String[] split = str.split(",");
                String ruleName = split[0];
                String ruleValue = split[1];

                broadcastState = context.getBroadcastState(mapStateDescriptor);

                KieSession kieSession = new KieHelper().addContent(ruleValue, ResourceType.DRL).build().newKieSession();
                //将规则放入广播流中
                broadcastState.put(ruleName, kieSession);
            }
        });

        process.print();

        env.execute();
    }
}
